/*
 * Copyright (c) Facebook, Inc. and its affiliates.
 *
 * Licensed under the Apache License Version 2.0 with LLVM Exceptions
 * (the "License"); you may not use this file except in compliance with
 * the License. You may obtain a copy of the License at
 *
 *   https://llvm.org/LICENSE.txt
 *
 * Unless required by applicable law or agreed to in writing, software
 * distributed under the License is distributed on an "AS IS" BASIS,
 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
 * See the License for the specific language governing permissions and
 * limitations under the License.
 */
#pragma once

#include <unifex/bind_back.hpp>
#include <unifex/execution_policy.hpp>
#include <unifex/get_execution_policy.hpp>
#include <unifex/receiver_concepts.hpp>
#include <unifex/sender_concepts.hpp>
#include <unifex/tag_invoke.hpp>

#include <unifex/detail/prologue.hpp>

namespace unifex {

namespace _bulk_join {

template <typename Receiver>
struct _join_receiver {
  class type;
};

template <typename Receiver>
using join_receiver = typename _join_receiver<Receiver>::type;

template <typename Receiver>
class _join_receiver<Receiver>::type {
public:
  template(typename Receiver2)  //
      (requires constructible_from<
          Receiver,
          Receiver2>)  //
      explicit type(Receiver2&& r) noexcept(
          std::is_nothrow_constructible_v<Receiver, Receiver2>)
    : receiver_((Receiver2 &&) r) {}

  void set_next() & noexcept {}

  template(typename... Values)                     //
      (requires receiver_of<Receiver, Values...>)  //
      void set_value(Values&&... values) noexcept(
          is_nothrow_receiver_of_v<Receiver, Values...>) {
    unifex::set_value(std::move(receiver_), (Values &&) values...);
  }

  template(typename Error)                  //
      (requires receiver<Receiver, Error>)  //
      void set_error(Error&& error) noexcept {
    unifex::set_error(std::move(receiver_), (Error &&) error);
  }

  void set_done() noexcept { unifex::set_done(std::move(receiver_)); }

  friend constexpr unifex::parallel_unsequenced_policy tag_invoke(
      tag_t<get_execution_policy>, [[maybe_unused]] const type& r) noexcept {
    return {};
  }

  template(typename CPO, typename Self)  //
      (requires is_receiver_query_cpo_v<CPO> AND same_as<
          Self,
          type>)  //
      friend auto tag_invoke(CPO cpo, const Self& self) noexcept(
          std::is_nothrow_invocable_v<CPO, const Receiver&>)
          -> std::invoke_result_t<CPO, const Receiver&> {
    return cpo(self.receiver_);
  }

private:
  Receiver receiver_;
};

template <typename Source>
struct _join_sender {
  class type;
};

template <typename Source>
using join_sender = typename _join_sender<Source>::type;

template <typename Source>
class _join_sender<Source>::type {
public:
  template <
      template <typename...>
      class Variant,
      template <typename...>
      class Tuple>
  using value_types = sender_value_types_t<Source, Variant, Tuple>;

  template <template <typename...> class Variant>
  using error_types = sender_error_types_t<Source, Variant>;

  static constexpr bool sends_done = sender_traits<Source>::sends_done;

  static constexpr blocking_kind blocking = sender_traits<Source>::blocking;

  static constexpr bool is_always_scheduler_affine =
      sender_traits<Source>::is_always_scheduler_affine;

  template <typename Source2>
  explicit type(Source2&& s) noexcept(
      std::is_nothrow_constructible_v<Source, Source2>)
    : source_((Source2 &&) s) {}

  template(typename Self, typename Receiver)  //
      (requires same_as<remove_cvref_t<Self>, type> AND sender_to<
          member_t<Self, Source>,
          join_receiver<remove_cvref_t<Receiver>>>)  //
      friend auto tag_invoke(
          tag_t<unifex::connect>,
          Self&& self,
          Receiver&&
              r) noexcept(std::
                              is_nothrow_constructible_v<
                                  remove_cvref_t<Receiver>>&&
                                  is_nothrow_connectable_v<
                                      member_t<Self, Source>,
                                      join_receiver<remove_cvref_t<Receiver>>>)
          -> connect_result_t<
              member_t<Self, Source>,
              join_receiver<remove_cvref_t<Receiver>>> {
    return unifex::connect(
        static_cast<Self&&>(self).source_,
        join_receiver<remove_cvref_t<Receiver>>{static_cast<Receiver&&>(r)});
  }

  friend constexpr blocking_kind
  tag_invoke(tag_t<unifex::blocking>, const type& s) noexcept {
    return unifex::blocking(s.source_);
  }

private:
  Source source_;
};

struct _fn {
  template(typename Source)                                        //
      (requires bulk_sender<Source>&& tag_invocable<_fn, Source>)  //
      auto
      operator()(Source&& source) const
      noexcept(is_nothrow_tag_invocable_v<_fn, Source>)
          -> tag_invoke_result_t<_fn, Source> {
    return tag_invoke(_fn{}, (Source &&) source);
  }

  template(typename Source)                                            //
      (requires bulk_sender<Source> && (!tag_invocable<_fn, Source>))  //
      auto
      operator()(Source&& source) const
      noexcept(std::is_nothrow_constructible_v<remove_cvref_t<Source>, Source>)
          -> join_sender<remove_cvref_t<Source>> {
    return join_sender<remove_cvref_t<Source>>{(Source &&) source};
  }
  constexpr auto operator()() const
      noexcept(std::is_nothrow_invocable_v<tag_t<bind_back>, _fn>)
          -> bind_back_result_t<_fn> {
    return bind_back(*this);
  }
};

}  // namespace _bulk_join

inline constexpr _bulk_join::_fn bulk_join{};

}  // namespace unifex

#include <unifex/detail/epilogue.hpp>
